在上次分析中,NewStorage()和NewREST()中生成核心storageInterface(即ETCD Helper)的是Decorator()。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
| storageInterface, dFunc := opts.Decorator( opts.StorageConfig, cachesize.GetWatchCacheSizeByResource(cachesize.Roles), &rbac.Role{}, prefix, role.Strategy, newListFunc, storage.NoTriggerPublisher, ) storageInterface, dFunc := opts.Decorator( opts.StorageConfig, cachesize.GetWatchCacheSizeByResource(cachesize.Nodes), &api.Node{}, prefix, node.Strategy, newListFunc, node.NodeNameTriggerFunc)
所以storageDecorator是用来生成ETCD Helper的。我们先来看下/pkg/master/master.go中对storageDecorator的赋值:
1 2 3 4 5 6
| if c.EnableWatchCache { restOptionsFactory.storageDecorator = registry.StorageWithCacher } else { restOptionsFactory.storageDecorator = generic.UndecoratedStorage }
1 2 3 4 5 6 7 8 9 10 11
| func UndecoratedStorage( config *storagebackend.Config, capacity int, objectType runtime.Object, resourcePrefix string, scopeStrategy rest.NamespaceScopedStrategy, newListFunc func() runtime.Object, trigger storage.TriggerPublisherFunc) (storage.Interface, factory.DestroyFunc) { return NewRawStorage(config) }
1 2 3 4 5 6 7 8 9 10 11
| func NewRawStorage(config *storagebackend.Config) (storage.Interface, factory.DestroyFunc) { s, d, err := factory.Create(*config) if err != nil { glog.Fatalf("Unable to create storage backend: config (%v), err (%v)", config, err) } return s, d }
NewRawStorage()直接调用factory.Create()来生成底层的key-value存储。这里的factory.Create()可以根据配置文件来生成ETCD2 Helper或ETCD3 Helper,这些,将在下一篇分析中详细介绍。
- 调用NewRawStorage()生成ETCD Helper;
- 生成CacherConfig,其中CacherConfig中的Storage字段为刚生成好的ETCD Helper;
- NewCacherFromConfig()生成Cacher
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40
| func StorageWithCacher( storageConfig *storagebackend.Config, capacity int, objectType runtime.Object, resourcePrefix string, scopeStrategy rest.NamespaceScopedStrategy, newListFunc func() runtime.Object, triggerFunc storage.TriggerPublisherFunc) (storage.Interface, factory.DestroyFunc) { s, d := generic.NewRawStorage(storageConfig) cacherConfig := storage.CacherConfig{ CacheCapacity: capacity, Storage: s, Versioner: etcdstorage.APIObjectVersioner{}, Type: objectType, ResourcePrefix: resourcePrefix, NewListFunc: newListFunc, TriggerPublisherFunc: triggerFunc, Codec: storageConfig.Codec, } if scopeStrategy.NamespaceScoped() { cacherConfig.KeyFunc = func(obj runtime.Object) (string, error) { return storage.NamespaceKeyFunc(resourcePrefix, obj) } } else { cacherConfig.KeyFunc = func(obj runtime.Object) (string, error) { return storage.NoNamespaceKeyFunc(resourcePrefix, obj) } } cacher := storage.NewCacherFromConfig(cacherConfig) destroyFunc := func() { cacher.Stop() d() } return cacher, destroyFunc }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50
| // Cacher is responsible for serving WATCH and LIST requests for a given // resource from its internal cache and updating its cache in the background // based on the underlying storage contents. // Cacher implements storage.Interface (although most of the calls are just // delegated to the underlying storage). type Cacher struct { // HighWaterMarks for performance debugging. // Important: Since HighWaterMark is using sync/atomic, it has to be at the top of the struct due to a bug on 32-bit platforms // See: https://golang.org/pkg/sync/atomic/ for more information incomingHWM HighWaterMark // Incoming events that should be dispatched to watchers. incoming chan watchCacheEvent sync.RWMutex // Before accessing the cacher's cache, wait for the ready to be ok. // This is necessary to prevent users from accessing structures that are // uninitialized or are being repopulated right now. // ready needs to be set to false when the cacher is paused or stopped. // ready needs to be set to true when the cacher is ready to use after // initialization. ready *ready // Underlying storage.Interface. storage Interface // Expected type of objects in the underlying cache. objectType reflect.Type // "sliding window" of recent changes of objects and the current state. watchCache *watchCache reflector *cache.Reflector // Versioner is used to handle resource versions. versioner Versioner // triggerFunc is used for optimizing amount of watchers that needs to process // an incoming event. triggerFunc TriggerPublisherFunc // watchers is mapping from the value of trigger function that a // watcher is interested into the watchers watcherIdx int watchers indexedWatchers // Handling graceful termination. stopLock sync.RWMutex stopped bool stopCh chan struct{} stopWg sync.WaitGroup }
- incoming: incoming channel中的Event会自动分发到各watcher中;
- storage: 底层的key-value存储,即ETCD Helper;
- watchCache: 与reflector配合使用,处理reflector中的Event;
- refstoragelector: 消费listerWatcher中的Event,并把Event并给watchCache处理;
- watchers: watcher数组;
- versioner: ETCD中数据版本管理结构体。
- 生成watchCache;
- 生成listerWatcher,listerWatcher对ETCD Helper进行了封装;
- 生成cacher;
- 设置watchCache中的OnEvent为cacher.processEvent;
- 启动cacher.dispatchEvents(),即分发方法;
- 返回cacher。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54
| func NewCacherFromConfig(config CacherConfig) *Cacher { watchCache := newWatchCache(config.CacheCapacity, config.KeyFunc) listerWatcher := newCacherListerWatcher(config.Storage, config.ResourcePrefix, config.NewListFunc) if obj, ok := config.Type.(runtime.Object); ok { if err := runtime.CheckCodec(config.Codec, obj); err != nil { panic("storage codec doesn't seem to match given type: " + err.Error()) } } cacher := &Cacher{ ready: newReady(), storage: config.Storage, objectType: reflect.TypeOf(config.Type), watchCache: watchCache, reflector: cache.NewReflector(listerWatcher, config.Type, watchCache, 0), versioner: config.Versioner, triggerFunc: config.TriggerPublisherFunc, watcherIdx: 0, watchers: indexedWatchers{ allWatchers: make(map[int]*cacheWatcher), valueWatchers: make(map[string]watchersMap), }, incoming: make(chan watchCacheEvent, 100), stopCh: make(chan struct{}), } watchCache.SetOnEvent(cacher.processEvent) go cacher.dispatchEvents() stopCh := cacher.stopCh cacher.stopWg.Add(1) go func() { defer cacher.stopWg.Done() wait.Until( func() { if !cacher.isStopped() { cacher.startCaching(stopCh) } }, time.Second, stopCh, ) }() return cacher }
processEvent()的作用很简单,就是把watchCacheEvent放入Cacher的incoming channel。现在我们有了incoming channel中的生产者。
1 2 3 4 5 6 7 8
| func (c *Cacher) processEvent(event watchCacheEvent) { if curLen := int64(len(c.incoming)); c.incomingHWM.Update(curLen) { glog.V(1).Infof("cacher (%v): %v objects queued in incoming channel.", c.objectType.String(), curLen) } c.incoming <- event }
1 2 3 4 5 6 7 8 9 10 11 12 13 14
| func (c *Cacher) dispatchEvents() { for { select { case event, ok := <-c.incoming: if !ok { return } c.dispatchEvent(&event) case <-c.stopCh: return } } }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37
| func (c *Cacher) dispatchEvent(event *watchCacheEvent) { triggerValues, supported := c.triggerValues(event) timeout := time.Duration(250) * time.Millisecond c.Lock() defer c.Unlock() for _, watcher := range c.watchers.allWatchers { watcher.add(event, &timeout) } if supported { for _, triggerValue := range triggerValues { for _, watcher := range c.watchers.valueWatchers[triggerValue] { watcher.add(event, &timeout) } } } else { for _, watchers := range c.watchers.valueWatchers { for _, watcher := range watchers { watcher.add(event, &timeout) } } } }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56
| func (c *Cacher) Watch(ctx context.Context, key string, resourceVersion string, pred SelectionPredicate) (watch.Interface, error) { watchRV, err := ParseWatchResourceVersion(resourceVersion) if err != nil { return nil, err } c.ready.wait() c.watchCache.RLock() defer c.watchCache.RUnlock() initEvents, err := c.watchCache.GetAllEventsSinceThreadUnsafe(watchRV) if err != nil { return newErrWatcher(err), nil } triggerValue, triggerSupported := "", false if matchValues := pred.MatcherIndex(); len(matchValues) > 0 { triggerValue, triggerSupported = matchValues[0].Value, true } chanSize := 10 if c.triggerFunc != nil && !triggerSupported { chanSize = 1000 } c.Lock() defer c.Unlock() forget := forgetWatcher(c, c.watcherIdx, triggerValue, triggerSupported) watcher := newCacheWatcher(watchRV, chanSize, initEvents, filterFunction(key, pred), forget) c.watchers.addWatcher(watcher, c.watcherIdx, triggerValue, triggerSupported) c.watcherIdx++ return watcher, nil }
1 2 3 4 5 6 7 8 9
| type cacheWatcher struct { sync.Mutex input chan watchCacheEvent result chan watch.Event filter filterObjectFunc done chan struct{} stopped bool forget func(bool) }
1 2 3 4 5 6 7 8 9 10 11 12 13
| func newCacheWatcher(resourceVersion uint64, chanSize int, initEvents []watchCacheEvent, filter filterObjectFunc, forget func(bool)) *cacheWatcher { watcher := &cacheWatcher{ input: make(chan watchCacheEvent, chanSize), result: make(chan watch.Event, chanSize), done: make(chan struct{}), filter: filter, stopped: false, forget: forget, } go watcher.process(initEvents, resourceVersion) return watcher }
process()会消费input channel中的watchCacheEvent,并调用sendWatchCacheEvent()进行处理。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43
| func (c *cacheWatcher) process(initEvents []watchCacheEvent, resourceVersion uint64) { defer utilruntime.HandleCrash() const initProcessThreshold = 500 * time.Millisecond startTime := time.Now() for _, event := range initEvents { c.sendWatchCacheEvent(&event) } processingTime := time.Since(startTime) if processingTime > initProcessThreshold { objType := "<null>" if len(initEvents) > 0 { objType = reflect.TypeOf(initEvents[0].Object).String() } glog.V(2).Infof("processing %d initEvents of %s took %v", len(initEvents), objType, processingTime) } defer close(c.result) defer c.Stop() for { event, ok := <-c.input if !ok { return } if event.ResourceVersion > resourceVersion { c.sendWatchCacheEvent(&event) } } }
sendWatchCacheEvent()会把watchCacheEvent转换成普通Event,并把Event放入到result channel。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50
| func (c *cacheWatcher) sendWatchCacheEvent(event *watchCacheEvent) { curObjPasses := event.Type != watch.Deleted && c.filter(event.Key, event.Object) oldObjPasses := false if event.PrevObject != nil { oldObjPasses = c.filter(event.Key, event.PrevObject) } if !curObjPasses && !oldObjPasses { return } object, err := api.Scheme.Copy(event.Object) if err != nil { glog.Errorf("unexpected copy error: %v", err) return } var watchEvent watch.Event switch { case curObjPasses && !oldObjPasses: watchEvent = watch.Event{Type: watch.Added, Object: object} case curObjPasses && oldObjPasses: watchEvent = watch.Event{Type: watch.Modified, Object: object} case !curObjPasses && oldObjPasses: watchEvent = watch.Event{Type: watch.Deleted, Object: object} } select { case <-c.done: return default: } select { case c.result <- watchEvent: case <-c.done: } }
add()可以把watchCacheEvent放到input channel中。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42
| func (c *cacheWatcher) add(event *watchCacheEvent, timeout *time.Duration) { select { case c.input <- *event: return default: } startTime := time.Now() t, ok := timerPool.Get().(*time.Timer) if ok { t.Reset(*timeout) } else { t = time.NewTimer(*timeout) } defer timerPool.Put(t) select { case c.input <- *event: stopped := t.Stop() if !stopped { <-t.C } case <-t.C: c.forget(false) c.stop() } if *timeout = *timeout - time.Since(startTime); *timeout < 0 { *timeout = 0 } }
所有watcher必须实现ResultChan()返回result channel。
1 2 3 4
| func (c *cacheWatcher) ResultChan() <-chan watch.Event { return c.result }
watchCache先把Event转换成cacheWatchEvent,然后对cacheWatchEvent进行处理。watchCache本身是一个store cache,可以和reflector配合使用。其中cacheWatchEvent比Event多了PrevObject,为了不增加复杂度,具体分析略。
watchCache的Add(), Update(), Delete()都会调用processEvent():
- 生成watchCacheEvent;
- 调用onEvent()
- 把watchCacheEvent进行缓存。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36
| func (w *watchCache) processEvent(event watch.Event, resourceVersion uint64, updateFunc func(*storeElement) error) error { key, err := w.keyFunc(event.Object) if err != nil { return fmt.Errorf("couldn't compute key: %v", err) } elem := &storeElement{Key: key, Object: event.Object} w.Lock() defer w.Unlock() previous, exists, err := w.store.Get(elem) if err != nil { return err } var prevObject runtime.Object if exists { prevObject = previous.(*storeElement).Object } watchCacheEvent := watchCacheEvent{ Type: event.Type, Object: event.Object, PrevObject: prevObject, Key: key, ResourceVersion: resourceVersion, } if w.onEvent != nil { w.onEvent(watchCacheEvent) } w.updateCache(resourceVersion, &watchCacheEvent) w.resourceVersion = resourceVersion w.cond.Broadcast() return updateFunc(elem) }
这里的onEvent就是Cacher的processEvent()方法,可以把watchCacheEvent放入到Cacher的incoming channel中。
reflector可以消费ListWatcher中的Event,并把Event通过调用cache的Add(), Update()等操对Event进行缓存并处理。具体reflector在以后介绍。
这次分析了两个ETCD Helper生成的入口函数:generic.UndecoratedStorage和registry.StorageWithCacher。
其中generic.UndecoratedStorage直接返回ETCD Helper。
而registry.StorageWithCacher中定义了Cacher,cacheWatcher,watchCache。Cacher先生成ETCD Helper,然后通过reflector机制把ETCD Helper的Event使用watchCache进行缓存并处理,处理的过程就是把Event放入到Cacher的incoming channel中。Cacher维护有一个dispatchEvents controller把incoming中的Event分发到各cacheWatcher,即放入到cacheWatcher中的input channel。cacheWatcher中有process controller把input中的Event放到result channel供外部消费。